package org.bookie.stateful;

import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Set;

import org.apache.hivemind.ApplicationRuntimeException;
import org.apache.hivemind.HiveMind;
import org.apache.hivemind.events.RegistryShutdownListener;
import org.apache.hivemind.impl.ConstructableServicePoint;
import org.apache.hivemind.impl.ProxyUtils;
import org.apache.hivemind.impl.servicemodel.AbstractServiceModelImpl;
import org.apache.hivemind.internal.Module;
import org.apache.hivemind.service.ThreadEventNotifier;


/**
 * A Hivemind service model for services with (long-lasting, 
 * i.e. request-spanning) state.
 *  
 * implementation is currently based on pooled service model.
 * 
 * @author Marcus Schulte
 */
public class StatefulServiceModel extends AbstractServiceModelImpl {

	/**
     * Name of a method in the deferred proxy that is used to obtain the constructed service.
     */
    protected static final String SERVICE_ACCESSOR_METHOD_NAME = "_service";

    private Object _serviceProxy;

    private ThreadEventNotifier _notifier;

    private ThreadLocal<StatefulService> _activeService;
    
    private Set<Object> _busyServices =  new HashSet<Object>() ;

    private ClientStateStorage _stateStorage;

    /** @since 1.1 */

    @SuppressWarnings("unchecked")
	private Class _serviceInterface;

    /**
     * Shared, null implementation of PoolManageable.
     */
    protected static final StatefulServiceLifecycleListener NULL_MANAGEABLE 
    				= new StatefulServiceLifecycleListener()
    {
        public void resumeConversation()
        {
        }

        public void pauseConversation()
        {
        }
        
        public void terminateConversation() {}
    };

    public StatefulServiceModel(ConstructableServicePoint servicePoint)
    {
        super(servicePoint);

        _serviceInterface = servicePoint.getServiceInterface();
    }

    public synchronized Object getService()
    {
        if (_notifier == null)
        {
            Module module = getServicePoint().getModule();

            _notifier = (ThreadEventNotifier) module.getService(
                    HiveMind.THREAD_EVENT_NOTIFIER_SERVICE,
                    ThreadEventNotifier.class);
        }
        if ( _stateStorage == null ) {
			_stateStorage = (ClientStateStorage) getServicePoint().getModule()
										.getService(ClientStateStorage.class);

		}

        if (_serviceProxy == null)
            _serviceProxy = constructServiceProxy();

        return _serviceProxy;
    }

    /**
     * Constructs the service proxy and returns it, wrapped in any interceptors.
     */
    private Object constructServiceProxy()
    {
        ConstructableServicePoint servicePoint = getServicePoint();

        if (_log.isDebugEnabled())
            _log.debug("Creating PooledProxy for service " + servicePoint.getExtensionPointId());

        Object proxy = ProxyUtils.createDelegatingProxy(
                "PooledProxy",
                this,
                "getServiceImplementationForCurrentThread",
                servicePoint);

        Object intercepted = addInterceptors(proxy);

        RegistryShutdownListener outerProxy = ProxyUtils
                .createOuterProxy(intercepted, servicePoint);
        
        servicePoint.addRegistryShutdownListener(outerProxy);

        return outerProxy;
    }

    public synchronized Object getServiceImplementationForCurrentThread()
    {
        if (_activeService == null)
            _activeService = new ThreadLocal<StatefulService>();

        StatefulService pooled = _activeService.get();

        if (pooled == null)
        {
            pooled = obtainPooledService();
            
            if ( _busyServices.contains( pooled ) )
            	throw new ConcurrentModificationException("Attempt to bind stateful service to two threads simultaneously.");

            pooled.activate();

            _notifier.addThreadCleanupListener(pooled);
            _activeService.set(pooled);
            _busyServices.add( pooled );
        }

        return pooled.getService();
    }

    private StatefulService obtainPooledService()
    {
    		
        StatefulService svc = getServiceFromClientStateStorage();

        if (svc == null) {
            svc = constructStatefulService();
            storeServiceIntoClientStateStorage( svc );
        }
        return svc;
    }

    private synchronized StatefulService getServiceFromClientStateStorage()
    {
    		return (StatefulService)
			_stateStorage.retrieve( getServicePoint().getExtensionPointId() );
    }

    private synchronized void storeServiceIntoClientStateStorage(StatefulService pooled)
    {
    		_stateStorage.store( getServicePoint().getExtensionPointId(), pooled );
    }

    private synchronized StatefulService constructStatefulService()
    {
        try
        {
            Object core = constructCoreServiceImplementation();

            // This is related to bean services.

            if (!_serviceInterface.isInstance(core))
                core = constructBridgeProxy(core);
            
            
            return new StatefulService(this, core);
        }
        catch (Exception ex)
        {
            throw new ApplicationRuntimeException( ex);
        }
    }

    protected synchronized void unbindPooledServiceFromCurrentThread(StatefulService pooled)
    {
        _notifier.removeThreadCleanupListener(pooled);

        _activeService.set(null);
        _busyServices.remove( pooled );

        pooled.passivate();

    }

    /**
     * Invokes {@link #getServiceImplementationForCurrentThread()}to instantiate an instance of the
     * service.
     */
    public void instantiateService()
    {
        getServiceImplementationForCurrentThread();
    }
}
